package org.databandtech.mockmq;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;

import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.archivers.zip.Zip64Mode;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipParameters;
import org.apache.commons.compress.compressors.gzip.GzipUtils;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.databandtech.mockmq.entity.EpgVod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Charsets;
import com.google.common.io.Files;
import com.google.common.io.LineProcessor;

/*
 * 如果数据源是gz压缩文件，从压缩文件中读取记录发送到kafka
 */
public class JsonFileReader {

	private static Logger logger = LoggerFactory.getLogger(JsonFileReader.class);

	static ThreadFactory namedThreadFactory = new BasicThreadFactory.Builder().namingPattern("snmLogRead-").build();

	private static ExecutorService pool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS,
			new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

	public static ExecutorService fixedExecutorService = Executors.newFixedThreadPool(2);

	public static String CHARSET_UTF8 = "UTF-8";
	public static String monitoringDir = "D:\\\\#SNM\\\\#实时大数据\\";
	public static String ftpOutputDir = "D:/home/";
	final static String HOST = "192.168.13.115:9092";// "192.168.10.60:9092"
	final static String TOPIC = "iptvlog";
	final static int PARTITION = 0; // 分区

	public static void main(String[] args) throws IOException {

		List<File> fileList;
		List<File> fileProcessedList = new ArrayList<File>();
		String gzFile = "D:\\#SNM\\#实时大数据\\probemsg_m_gdfos-ps-otv-sv52-sash-snm-tv52_131_20210806153000.cdr.gz";

		// 1、解压gz文件,另一个进程处理
//		long starttime = System.currentTimeMillis();
//		
//		CompressFileUtils.decompressByGZip(gzFile,outputDir);
//		
//		long endtime = System.currentTimeMillis();
//		long duration = endtime - starttime;
//		System.out.println("解压缩完成。总计时长： "+duration);

		// 1、监控是否有新的已解压文件，每N分钟执行一次
		fileList = (List<File>) FileUtils.listFiles(new File(monitoringDir), new String[] { "cdr", "CDR", "json" }, true);
		// fileList.stream().forEach(file -> FileUtil.modifyFileContent(file, "ccc",
		// "aaa"));
		fileList.stream().forEach(file -> FileUtil.showFilename(file));

		// 2、读取文件，转换为string list数组, 输出到kafka

		Properties properties = new Properties();
		properties.put("bootstrap.servers", HOST);
		// 0:producer不会等待broker发送ack
		// 1:当leader接收到消息后发送ack
		// -1:当所有的follower都同步消息成功后发送ack
		properties.put("acks", "-1");
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

		@SuppressWarnings("resource")
		org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(
				properties);

		for (File file : fileList) {// 处理多个文件
			if (fileProcessedList == null || !fileProcessedList.contains(file)) {
				long starttime = System.currentTimeMillis();
				int count = 0;
				long sumByte = 0L;

				// 如果是新文件则处理
				// #方式一：传统方式
				// org.apache.commons.io.FileUtils
				//List<String> fileStrList = FileUtils.readLines(file, CHARSET_UTF8);

				// com.google.common.io.Files
				// List<String> fileStrList = Files.readLines(file, Charsets.UTF_8);

				// #方式一：传统方式的输出
				//for (String lineStr : fileStrList) {					
				//	KafkaUtils.send(kafkaProducer,TOPIC,lineStr);					
				//	sumByte = sumByte + lineStr.getBytes("utf-8").length;
				//	count++;
//				}

				// #方式二：apache.commons lineIterator 方式
				LineIterator it = FileUtils.lineIterator(file, "UTF-8");
				try {
					while (it.hasNext()) {
						String lineStr = it.nextLine();
						KafkaUtils.send(kafkaProducer, TOPIC, lineStr);
						sumByte = sumByte + lineStr.getBytes("utf-8").length;
						count++;
					}
				} finally {
					it.close();
				}
				
				//#方式三：Guava的LineProcessor方式，就是变量定义麻烦一些 . 不建议
//				Files.asCharSource(file, Charset.defaultCharset()).readLines(new LineProcessor<String>() {
//					int count = 0;
//					long sumByte = 0L;
//					
//                    public boolean processLine(String lineStr) throws IOException {
//                        //这里处理一行字符串
//                    	KafkaUtils.send(kafkaProducer, TOPIC, lineStr);
//						sumByte = sumByte + lineStr.getBytes("utf-8").length;
//						count++;
//                        return true;//如果是false，则会中断读取操作
//                    }
// 
//                    public String getResult() {
//                        return "总行数： " + count + ",总字节： " + sumByte;//返回的结果。可以自定义
//                    }
//                });

				
				// #方式四：内存映射文件 . 不建议
//				FileInputStream fileIn = new FileInputStream(file);
//				FileChannel fileChannel = fileIn.getChannel();
//				MappedByteBuffer mappedBuf = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
//				boolean end = false;
//				do {
//				    int limit = mappedBuf.limit();
//				    int position = mappedBuf.position();
//				    if (position >= limit) {
//				        end = true;
//				    }
//
//				    int maxSize = 4096;
//				    if (limit - position < maxSize) {
//				        maxSize = limit - position;
//				    }
//				    byte[] array = new byte[maxSize];
//				    mappedBuf.get(array);
//				    
//				    //通过array处理单独一行，遍历
//				    //System.out.println(new String(array));
//				    String lineStr = new String(array);
//					KafkaUtils.send(kafkaProducer, TOPIC, lineStr);
//					sumByte = sumByte + lineStr.getBytes("utf-8").length;
//					count++;
//					
//				} while (!end);
//				mappedBuf.clear();
//				fileChannel.close();
//				fileIn.close();


				// 多线程方式
				// fixedExecutorService.execute(new
				// GzFileReadRunnable(kafkaProducer,fileStrList));

				fileProcessedList.add(file);

				long endtime = System.currentTimeMillis();
				long duration = endtime - starttime;
				System.out.println("完成。总计时长： " + duration + ",总行数： " + count + ",总字节： " + sumByte);
			}
		}

	}

}
